[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706958#comment-16706958 ] ASF GitHub Bot commented on FLINK-11013: asfgit closed pull request #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 84e3f79cec3..c0cfa247bca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -633,6 +633,8 @@ case class WindowAggregate( case aggExpr: Aggregation if aggExpr.getSqlAggFunction.requiresOver => failValidation(s"OVER clause is necessary for window functions: [${aggExpr.getClass}].") + case aggExpr: DistinctAgg => +validateAggregateExpression(aggExpr.child) // check no nested aggregation exists. case aggExpr: Aggregation => aggExpr.children.foreach { child => diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala index 671f8dd8d1d..afa9f8b0c79 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.TableTestBase @@ -238,4 +239,78 @@ class AggregateTest extends TableTestBase { util.verifyTable(resultTable, expected) } + + @Test + def testDistinctAggregateOnTumbleWindow(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Int, Long, String)]( + "MyTable", 'a, 'b, 'c, 'rowtime.rowtime) +val result = table + .window(Tumble over 15.minute on 'rowtime as 'w) + .groupBy('w) + .select('a.count.distinct, 'a.sum) + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "rowtime") + ), + term("window", TumblingGroupWindow('w, 'rowtime, 90.millis)), + term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(a) AS TMP_1") +) + +util.verifyTable(result, expected) + } + + @Test + def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Int, Long, String)]( + "MyTable", 'a, 'b, 'c, 'rowtime.rowtime) +val result = table + .window(Slide over 1.hour every 15.minute on 'rowtime as 'w) + .groupBy('w) + .select('a.count.distinct, 'a.sum.distinct, 'a.max.distinct) + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "rowtime") + ), + term("window", SlidingGroupWindow('w, 'rowtime, 360.millis, 90.millis)), + term("select", "COUNT(DISTINCT a) AS TMP_0", "SUM(DISTINCT a) AS TMP_1", + "MAX(DISTINCT a) AS TMP_2") +) + +util.verifyTable(result, expected) + } + + @Test + def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Int, Long, String)]( + "MyTable", 'a, 'b, 'c, 'rowtime.rowtime) +val result = table + .window(Session withGap 15.minute on 'rowtime as 'w) + .groupBy('a, 'w) + .select('a, 'a.count, 'c.count.distinct) + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "rowtime") + ), + term("groupBy", "a"), + term("window", SessionGroupWindow('w, 'rowtime, 90.millis)), + term("select", "a", "COUNT(a) AS TMP_0",
[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706688#comment-16706688 ] ASF GitHub Bot commented on FLINK-11013: sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181#issuecomment-443595531 @dianfu @walterddr Thanks for the review and update! will merging.. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix distinct aggregates for group window in Table API > - > > Key: FLINK-11013 > URL: https://issues.apache.org/jira/browse/FLINK-11013 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently distinct aggregates does not work on group window in Table API. > For the following query: > {code:java} > val table = util.addTable[(Int, Long, String)]( > "MyTable", > 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) > val result = table > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w) > .select('a.count.distinct, 'a.sum) > {code} > The following exception will be thrown: > {code:java} > org.apache.flink.table.api.ValidationException: It's not allowed to use an > aggregate function as input of another aggregate function > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641) > at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628) > at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206) > at > org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702606#comment-16702606 ] ASF GitHub Bot commented on FLINK-11013: dianfu commented on issue #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181#issuecomment-442674317 @walterddr Thanks a lot for your review. The comments make sense to me and have updated accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix distinct aggregates for group window in Table API > - > > Key: FLINK-11013 > URL: https://issues.apache.org/jira/browse/FLINK-11013 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently distinct aggregates does not work on group window in Table API. > For the following query: > {code:java} > val table = util.addTable[(Int, Long, String)]( > "MyTable", > 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) > val result = table > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w) > .select('a.count.distinct, 'a.sum) > {code} > The following exception will be thrown: > {code:java} > org.apache.flink.table.api.ValidationException: It's not allowed to use an > aggregate function as input of another aggregate function > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641) > at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628) > at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206) > at > org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at >
[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702565#comment-16702565 ] ASF GitHub Bot commented on FLINK-11013: walterddr commented on a change in pull request #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181#discussion_r237316435 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/DistinctAggregateTest.scala ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} +import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode} +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +class DistinctAggregateTest extends TableTestBase { + val util = streamTestUtil() + val table = util.addTable[(Int, Long, String)]( +"MyTable", +'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + + @Test + def testDistinctAggregate(): Unit = { +val result = table Review comment: this case is already covered in "AggregateTest" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix distinct aggregates for group window in Table API > - > > Key: FLINK-11013 > URL: https://issues.apache.org/jira/browse/FLINK-11013 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently distinct aggregates does not work on group window in Table API. > For the following query: > {code:java} > val table = util.addTable[(Int, Long, String)]( > "MyTable", > 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) > val result = table > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w) > .select('a.count.distinct, 'a.sum) > {code} > The following exception will be thrown: > {code:java} > org.apache.flink.table.api.ValidationException: It's not allowed to use an > aggregate function as input of another aggregate function > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641) > at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at >
[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702566#comment-16702566 ] ASF GitHub Bot commented on FLINK-11013: walterddr commented on a change in pull request #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181#discussion_r237316354 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/DistinctAggregateTest.scala ## @@ -0,0 +1,119 @@ +/* Review comment: We can probably add these to `org.apache.flink.table.api.stream.table.AggregateTest.scala` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix distinct aggregates for group window in Table API > - > > Key: FLINK-11013 > URL: https://issues.apache.org/jira/browse/FLINK-11013 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently distinct aggregates does not work on group window in Table API. > For the following query: > {code:java} > val table = util.addTable[(Int, Long, String)]( > "MyTable", > 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) > val result = table > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w) > .select('a.count.distinct, 'a.sum) > {code} > The following exception will be thrown: > {code:java} > org.apache.flink.table.api.ValidationException: It's not allowed to use an > aggregate function as input of another aggregate function > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641) > at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628) > at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206) > at > org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at >
[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701481#comment-16701481 ] ASF GitHub Bot commented on FLINK-11013: sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181#issuecomment-442346412 @dianfu Thanks for the PR! Good catch! I'll merge the commit after travis gives green. Best, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix distinct aggregates for group window in Table API > - > > Key: FLINK-11013 > URL: https://issues.apache.org/jira/browse/FLINK-11013 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently distinct aggregates does not work on group window in Table API. > For the following query: > {code:java} > val table = util.addTable[(Int, Long, String)]( > "MyTable", > 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) > val result = table > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w) > .select('a.count.distinct, 'a.sum) > {code} > The following exception will be thrown: > {code:java} > org.apache.flink.table.api.ValidationException: It's not allowed to use an > aggregate function as input of another aggregate function > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641) > at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628) > at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206) > at > org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at >
[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API
[ https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700464#comment-16700464 ] ASF GitHub Bot commented on FLINK-11013: dianfu opened a new pull request #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API URL: https://github.com/apache/flink/pull/7181 ## What is the purpose of the change *This pull request fix the distinct aggregate for group window on table API* ## Brief change log *(for example:)* - *Update the validate logical in operators for WindowAggregate to make it work for distinct* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added tests in DistinctAggregateTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix distinct aggregates for group window in Table API > - > > Key: FLINK-11013 > URL: https://issues.apache.org/jira/browse/FLINK-11013 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Currently distinct aggregates does not work on group window in Table API. > For the following query: > {code:java} > val table = util.addTable[(Int, Long, String)]( > "MyTable", > 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) > val result = table > .window(Tumble over 15.minute on 'rowtime as 'w) > .groupBy('w) > .select('a.count.distinct, 'a.sum) > {code} > The following exception will be thrown: > {code:java} > org.apache.flink.table.api.ValidationException: It's not allowed to use an > aggregate function as input of another aggregate function > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641) > at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at > org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628) > at scala.collection.immutable.List.foreach(List.scala:392) > at >