[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706958#comment-16706958
 ] 

ASF GitHub Bot commented on FLINK-11013:


asfgit closed pull request #7181: [FLINK-11013] [table] Fix distinct aggregates 
for group window in Table API
URL: https://github.com/apache/flink/pull/7181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 84e3f79cec3..c0cfa247bca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -633,6 +633,8 @@ case class WindowAggregate(
   case aggExpr: Aggregation
 if aggExpr.getSqlAggFunction.requiresOver =>
 failValidation(s"OVER clause is necessary for window functions: 
[${aggExpr.getClass}].")
+  case aggExpr: DistinctAgg =>
+validateAggregateExpression(aggExpr.child)
   // check no nested aggregation exists.
   case aggExpr: Aggregation =>
 aggExpr.children.foreach { child =>
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
index 671f8dd8d1d..afa9f8b0c79 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
@@ -238,4 +239,78 @@ class AggregateTest extends TableTestBase {
 
 util.verifyTable(resultTable, expected)
   }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Int, Long, String)](
+  "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+val result = table
+  .window(Tumble over 15.minute on 'rowtime as 'w)
+  .groupBy('w)
+  .select('a.count.distinct, 'a.sum)
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "rowtime")
+  ),
+  term("window", TumblingGroupWindow('w, 'rowtime, 90.millis)),
+  term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(a) AS TMP_1")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Int, Long, String)](
+  "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+val result = table
+  .window(Slide over 1.hour every 15.minute on 'rowtime as 'w)
+  .groupBy('w)
+  .select('a.count.distinct, 'a.sum.distinct, 'a.max.distinct)
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "rowtime")
+  ),
+  term("window", SlidingGroupWindow('w, 'rowtime, 360.millis, 
90.millis)),
+  term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(DISTINCT a) AS TMP_1",
+   "MAX(DISTINCT a) AS TMP_2")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Int, Long, String)](
+  "MyTable", 'a, 'b, 'c, 'rowtime.rowtime)
+val result = table
+  .window(Session withGap 15.minute on 'rowtime as 'w)
+  .groupBy('a, 'w)
+  .select('a, 'a.count, 'c.count.distinct)
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "rowtime")
+  ),
+  term("groupBy", "a"),
+  term("window", SessionGroupWindow('w, 'rowtime, 90.millis)),
+  term("select", "a", "COUNT(a) AS TMP_0", 

[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706688#comment-16706688
 ] 

ASF GitHub Bot commented on FLINK-11013:


sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct 
aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181#issuecomment-443595531
 
 
   @dianfu @walterddr Thanks for the review and update! will merging..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206)
> at 
> org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)

[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702606#comment-16702606
 ] 

ASF GitHub Bot commented on FLINK-11013:


dianfu commented on issue #7181: [FLINK-11013] [table] Fix distinct aggregates 
for group window in Table API
URL: https://github.com/apache/flink/pull/7181#issuecomment-442674317
 
 
   @walterddr Thanks a lot for your review. The comments make sense to me and 
have updated accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206)
> at 
> org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> 

[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702565#comment-16702565
 ] 

ASF GitHub Bot commented on FLINK-11013:


walterddr commented on a change in pull request #7181: [FLINK-11013] [table] 
Fix distinct aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181#discussion_r237316435
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/DistinctAggregateTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class DistinctAggregateTest extends TableTestBase {
+  val util = streamTestUtil()
+  val table = util.addTable[(Int, Long, String)](
+"MyTable",
+'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testDistinctAggregate(): Unit = {
+val result = table
 
 Review comment:
   this case is already covered in "AggregateTest"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> 

[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702566#comment-16702566
 ] 

ASF GitHub Bot commented on FLINK-11013:


walterddr commented on a change in pull request #7181: [FLINK-11013] [table] 
Fix distinct aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181#discussion_r237316354
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/DistinctAggregateTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
 
 Review comment:
   We can probably add these to 
`org.apache.flink.table.api.stream.table.AggregateTest.scala`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206)
> at 
> org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> 

[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701481#comment-16701481
 ] 

ASF GitHub Bot commented on FLINK-11013:


sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct 
aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181#issuecomment-442346412
 
 
   @dianfu Thanks for the PR! 
   Good catch! I'll merge the commit after travis gives green.
   Best,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206)
> at 
> org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> 

[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700464#comment-16700464
 ] 

ASF GitHub Bot commented on FLINK-11013:


dianfu opened a new pull request #7181: [FLINK-11013] [table] Fix distinct 
aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181
 
 
   ## What is the purpose of the change
   
   *This pull request fix the distinct aggregate for group window on table API*
   
   ## Brief change log
   
   *(for example:)*
 - *Update the validate logical in operators for WindowAggregate to make it 
work for distinct*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added tests in DistinctAggregateTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
>