[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user chermenin closed the pull request at: https://github.com/apache/flink/pull/3026 ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97317694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/groupings.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +abstract sealed class GroupFunction extends Expression { + + override def toString = s"GroupFunction($children)" + + private[flink] def replaceExpression( --- End diff -- Can you add a Scaladoc here (esp. for the parameters)? Please also add some line comments in this method to improve readability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97299113 --- Diff: docs/dev/table_api.md --- @@ -790,6 +790,40 @@ val result = in.groupBy('a).select('a, 'b.sum as 'd); + GroupingSets + +Similar to a SQL GROUP BY GROUPING SETS clause. A GROUPING SETS expression allows to selectively specify the +set of groups that you want to create within a GROUP BY clause. +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.groupingSets(('a, 'b), 'b, ()).select('a, 'b, 'c.sum as 'd); +{% endhighlight %} + + + + + Cube + +Similar to a SQL GROUP BY CUBE clause. A CUBE expression will generate subtotals for all combinations of the dimensions specified. +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.cube('a, 'b).select('a, 'b, 'c.sum as 'd); +{% endhighlight %} + + + + + Cube + +Similar to a SQL GROUP BY ROLLUP clause. A ROLLUP expression produces group subtotals from right to left and a grand total. --- End diff -- Can you also add an example here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97306898 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -688,4 +726,37 @@ object array { } } +/** + * Grouping function. Similar to a SQL GROUP_ID function. --- End diff -- Please add the same description (with example) as in the documentation. `apply()` should have the same description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97306369 --- Diff: docs/dev/table_api.md --- @@ -2714,6 +2748,42 @@ ARRAY.element() + + +{% highlight scala %} +groupId() +{% endhighlight %} + + +Returns an integer that uniquely identifies the combination of grouping keys. + + + + + +{% highlight scala %} +ANY.grouping() +grouping(ANY) +{% endhighlight %} + + +Returns 1 if expression is rolled up in the current rowâs grouping set, 0 otherwise. + + + + + +{% highlight scala %} +ANY.groupingId() +(ANY [, ANY ]*).groupingId() --- End diff -- I think it is enough to be as close as possible to SQL syntax (`groupingId(ANY, ANY, ...)`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97314983 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala --- @@ -151,4 +155,35 @@ object ExpressionUtils { rexBuilder.makeExactLiteral(value)) } + /** Computes the rollup of bit sets. +* +* For example, rollup({0}, {1}) --- End diff -- According to Scala style guide (http://docs.scala-lang.org/style/scaladoc.html): "Use the wiki-style syntax instead of HTML wherever possible" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97298808 --- Diff: docs/dev/table_api.md --- @@ -790,6 +790,40 @@ val result = in.groupBy('a).select('a, 'b.sum as 'd); + GroupingSets + +Similar to a SQL GROUP BY GROUPING SETS clause. A GROUPING SETS expression allows to selectively specify the +set of groups that you want to create within a GROUP BY clause. +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.groupingSets(('a, 'b), 'b, ()).select('a, 'b, 'c.sum as 'd); +{% endhighlight %} + + + + + Cube + +Similar to a SQL GROUP BY CUBE clause. A CUBE expression will generate subtotals for all combinations of the dimensions specified. +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.cube('a, 'b).select('a, 'b, 'c.sum as 'd); +{% endhighlight %} + + + + + Cube --- End diff -- `Rollup` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97320762 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -94,7 +94,24 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { child.construct(relBuilder) relBuilder.project( - projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map { --- End diff -- I think this translation should rather happen in `resolveExpression` and needs to be reworked a bit. E.g. `groupId() as 'a as 'f` fails. Some inline comments would also be very good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97314389 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala --- @@ -176,6 +179,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val atom: PackratParser[Expression] = ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference + lazy val grouped: PackratParser[Expression] = +"(" ~> expressionList <~ ")" ^^ { l => new GroupedExpression(l.toSeq) } + + lazy val unit: PackratParser[Expression] = --- End diff -- Use a different name for this. My IDE shows warnings. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97317531 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/groupings.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +abstract sealed class GroupFunction extends Expression { + + override def toString = s"GroupFunction($children)" + + private[flink] def replaceExpression( +relBuilder: RelBuilder, +groupExpressions: Option[Seq[Expression]], +children: Seq[Attribute] = Seq(), +indicator: Boolean = false): Expression = { + +if (groupExpressions.isDefined) { + val expressions = groupExpressions.get + if (!indicator) { +Cast( --- End diff -- All arguments here are literals. We don't need to use `Expression` operations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97314552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala --- @@ -375,15 +393,26 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixFlattening: PackratParser[Expression] = FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } + lazy val prefixGrouping: PackratParser[Expression] = +GROUPING ~ "(" ~> composite <~ ")" ^^ { e => Grouping(e) } + + lazy val prefixGroupingId: PackratParser[Expression] = +GROUPING_ID ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { l => GroupingId(l: _*) } + + lazy val prefixGroupId: PackratParser[Expression] = +GROUP_ID ~ opt("()") ^^ { _ => GroupId() } + lazy val prefixed: PackratParser[Expression] = prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | - prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | - prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | prefixFlattening | + prefixStart | prefixEnd | + prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract | + prefixFloor | prefixCeil | prefixGet | prefixFlattening | + prefixGroupingId | prefixGrouping | prefixGroupId | prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end // suffix/prefix composite - lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom | + lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom | grouped | --- End diff -- The `grouped` in `expression` should be enough. We don't need it here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97312088 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala --- @@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression { abstract class LeafExpression extends Expression { private[flink] val children = Nil } + +class GroupedExpression( +private[flink] val children: Seq[Expression] + ) extends Expression { + + def this(product: Product) { +this( + product.productIterator +.map { + case e: Expression => e + case s: Symbol => UnresolvedFieldReference(s.name) + case p: Product => new GroupedExpression(p) + case _ => throw new IllegalArgumentException() --- End diff -- Please throw an `TableException` here and give a meaningful message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97311360 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -928,5 +1032,151 @@ class GroupWindowedTable( val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } +} + +/** + * A table that has been grouped on several sets of grouping keys. + */ +class GroupingSetsTable( --- End diff -- In order to reduce the code complexity, we should adapt `GroupedTable` and do a case distinction there. 4 kinds of grouped tables are too much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97313779 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala --- @@ -375,15 +393,26 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixFlattening: PackratParser[Expression] = FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } + lazy val prefixGrouping: PackratParser[Expression] = --- End diff -- If you add `grouping`, `groupingId` and `groupId` to the `FunctionCatalog.builtInFunctions`. We don't need the following lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97324083 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -408,10 +408,20 @@ public static void compareKeyValuePairsWithDelta(String expectedLines, String re } } + public static > void compareResultCollections(List expected, List actual) { --- End diff -- Please try to avoid modifying classes outside of `flink-table` unless really necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97307329 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -15,14 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.table.api import org.apache.calcite.rel.RelNode +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.logical.Minus -import org.apache.flink.table.expressions.{Alias, Asc, Call, Expression, ExpressionParser, Ordering, TableFunctionCall, UnresolvedAlias} +import org.apache.flink.table.plan.logical.{Minus => MinusNode} +import org.apache.flink.table.expressions._ --- End diff -- Is this import and the renaming of Minus really necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97323770 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupingSetsTest.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} +import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit._ + +import scala.collection.JavaConverters._ + +class GroupingSetsTest { --- End diff -- Because we need to reduce the number of ITCases for improving the built time, I propose a different test structure: Add a `org.apache.flink.table.runtime.dataset.DataSetAggregateITCase` that tests grouping sets (not cube or rollup) in Scala Table API. The remaining tests can be logical unit tests for Java Table API, Scala Table API and SQL API. You can find additional information how we test here: https://issues.apache.org/jira/browse/FLINK-3656 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97312771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala --- @@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression { abstract class LeafExpression extends Expression { private[flink] val children = Nil } + +class GroupedExpression( +private[flink] val children: Seq[Expression] + ) extends Expression { + + def this(product: Product) { +this( + product.productIterator +.map { + case e: Expression => e + case s: Symbol => UnresolvedFieldReference(s.name) + case p: Product => new GroupedExpression(p) + case _ => throw new IllegalArgumentException() +}.toSeq +) + } + + def flatChildren: Seq[Expression] = { +children.flatMap { + case g: GroupedExpression => g.flatChildren + case x => Seq(x) +} + } + + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It's not applicable for grouped expressions. +*/ + override private[flink] def resultType = ??? + + /** +* Grouping function. Similar to a SQL GROUPING_ID function. +*/ + def groupingId(): Expression = GroupingId(children: _*) --- End diff -- Remove this function as mentioned previously. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97304886 --- Diff: docs/dev/table_api.md --- @@ -2714,6 +2748,42 @@ ARRAY.element() + --- End diff -- Can you add a `Grouping functions` section at the same position than in SQL functions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97312920 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala --- @@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression { abstract class LeafExpression extends Expression { private[flink] val children = Nil } + +class GroupedExpression( +private[flink] val children: Seq[Expression] + ) extends Expression { + + def this(product: Product) { +this( + product.productIterator +.map { + case e: Expression => e + case s: Symbol => UnresolvedFieldReference(s.name) + case p: Product => new GroupedExpression(p) + case _ => throw new IllegalArgumentException() +}.toSeq +) + } + + def flatChildren: Seq[Expression] = { +children.flatMap { + case g: GroupedExpression => g.flatChildren + case x => Seq(x) +} + } + + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It's not applicable for grouped expressions. +*/ + override private[flink] def resultType = ??? + + /** +* Grouping function. Similar to a SQL GROUPING_ID function. +*/ + def groupingId(): Expression = GroupingId(children: _*) + + override def productElement(n: Int): Expression = children(n) --- End diff -- Are these `Product` functions not implemented already in super classes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97306096 --- Diff: docs/dev/table_api.md --- @@ -2714,6 +2748,42 @@ ARRAY.element() + + +{% highlight scala %} +groupId() +{% endhighlight %} + + +Returns an integer that uniquely identifies the combination of grouping keys. + + + + + +{% highlight scala %} +ANY.grouping() +grouping(ANY) --- End diff -- In order to reduce the code complexity, I think it is enough if we only support the `grouping(...)` syntax. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97322915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -616,6 +706,118 @@ case class WindowAggregate( } } +case class GroupingWindowAggregate( --- End diff -- Please remove this class for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97313160 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala --- @@ -289,12 +298,21 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixFlattening: PackratParser[Expression] = composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) } + lazy val suffixGrouping: PackratParser[Expression] = --- End diff -- Only support prefix notation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97298700 --- Diff: docs/dev/table_api.md --- @@ -790,6 +790,40 @@ val result = in.groupBy('a).select('a, 'b.sum as 'd); + GroupingSets + +Similar to a SQL GROUP BY GROUPING SETS clause. A GROUPING SETS expression allows to selectively specify the +set of groups that you want to create within a GROUP BY clause. +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.groupingSets(('a, 'b), 'b, ()).select('a, 'b, 'c.sum as 'd); +{% endhighlight %} + + + + + Cube + +Similar to a SQL GROUP BY CUBE clause. A CUBE expression will generate subtotals for all combinations of the dimensions specified. --- End diff -- Would be great if you could also add an example. E.g. `.cube('a, 'b)` is equivalent to `.groupingSets(...)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97305200 --- Diff: docs/dev/table_api.md --- @@ -2714,6 +2748,42 @@ ARRAY.element() + + +{% highlight scala %} +groupId() +{% endhighlight %} + + +Returns an integer that uniquely identifies the combination of grouping keys. + + + + + +{% highlight scala %} +ANY.grouping() +grouping(ANY) +{% endhighlight %} + + +Returns 1 if expression is rolled up in the current rowâs grouping set, 0 otherwise. + + + + + +{% highlight scala %} +ANY.groupingId() +(ANY [, ANY ]*).groupingId() +groupingId(ANY [, ANY ]*) +{% endhighlight %} + + +Returns a bit vector of the given grouping expressions. + + + --- End diff -- Add the same documentation to the Java section. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97322831 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -264,6 +281,79 @@ case class Aggregate( } } +case class GroupingAggregation( --- End diff -- I wonder if we could merge this class with `Aggregate`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97310709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -928,5 +1032,151 @@ class GroupWindowedTable( val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } +} + +/** + * A table that has been grouped on several sets of grouping keys. + */ +class GroupingSetsTable( + private[flink] val table: Table, + private[flink] val groups: Seq[Seq[Expression]], + private[flink] val sqlKind: SqlKind) { --- End diff -- I think we don't need this parameter if the translation is done in `cube()` and `rollup()` already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97311604 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -928,5 +1032,151 @@ class GroupWindowedTable( val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } +} + +/** + * A table that has been grouped on several sets of grouping keys. + */ +class GroupingSetsTable( + private[flink] val table: Table, + private[flink] val groups: Seq[Seq[Expression]], + private[flink] val sqlKind: SqlKind) { + + /** +* Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. +* The field expressions can contain complex expressions and aggregations. +* +* Example: +* +* {{{ +* tab.groupingSets('key).select('key, 'value.avg + " The average" as 'average) +* }}} +*/ + def select(fields: Expression*): Table = { + +val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv) + +if (propNames.nonEmpty) { + throw ValidationException("Window properties can only be used on windowed tables.") +} + +val groupingSets = sqlKind match { + case SqlKind.CUBE => ExpressionUtils.cube(groups) + case SqlKind.ROLLUP => ExpressionUtils.rollup(groups) + case _ => groups +} + +val projectsOnAgg = replaceAggregationsAndProperties( + fields, table.tableEnv, aggNames, propNames) +val projectFields = extractFieldReferences(fields ++ groupingSets.flatten.distinct) + +val logical = + Project(projectsOnAgg, +GroupingAggregation(groupingSets, aggNames.map(a => Alias(a._1, a._2)).toSeq, +Project(projectFields, table.logicalPlan).validate(table.tableEnv) +).validate(table.tableEnv) + ).validate(table.tableEnv) + +new Table(table.tableEnv, logical) + } + + /** +* Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. +* The field expressions can contain complex expressions and aggregations. +* +* Example: +* +* {{{ +* tab.groupBy("key").select("key, value.avg + ' The average' as average") +* }}} +*/ + def select(fields: String): Table = { +val fieldExprs = ExpressionParser.parseExpressionList(fields) +select(fieldExprs: _*) + } + /** +* Groups the records of a table by assigning them to windows defined by a time or row interval. +* +* For streaming tables of infinite size, grouping into windows is required to define finite +* groups on which group-based aggregates can be computed. +* +* For batch tables of finite size, windowing essentially provides shortcuts for time-based +* groupBy. +* +* @param groupWindow group-window that specifies how elements are grouped. +* @return A windowed table. +*/ + def window(groupWindow: GroupWindow): GroupingSetsWindowedTable = { +if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw new ValidationException(s"Windows on batch tables are currently not supported.") +} +new GroupingSetsWindowedTable(table, groups, sqlKind, groupWindow) + } +} + +class GroupingSetsWindowedTable( --- End diff -- We decided to not support grouping sets in a stream environment yet and there also no tests for it. Could you remove this class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97312299 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala --- @@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression { abstract class LeafExpression extends Expression { private[flink] val children = Nil } + +class GroupedExpression( +private[flink] val children: Seq[Expression] + ) extends Expression { + + def this(product: Product) { +this( + product.productIterator +.map { + case e: Expression => e + case s: Symbol => UnresolvedFieldReference(s.name) + case p: Product => new GroupedExpression(p) + case _ => throw new IllegalArgumentException() +}.toSeq +) + } + + def flatChildren: Seq[Expression] = { +children.flatMap { + case g: GroupedExpression => g.flatChildren + case x => Seq(x) +} + } + + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It's not applicable for grouped expressions. +*/ + override private[flink] def resultType = ??? --- End diff -- This should be a `UnsupportedOperationException` with a meaningful message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3026#discussion_r97308739 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala --- @@ -66,7 +66,7 @@ import _root_.scala.reflect.ClassTag * }}} * */ -package object scala extends ImplicitExpressionConversions { +package object scala extends ImplicitExpressionConversions with ImplicitGroupedConversions { --- End diff -- You can add the contents of `ImplicitGroupedConversions` to `ImplicitExpressionConversions`. We don't need an additional class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...
GitHub user chermenin opened a pull request: https://github.com/apache/flink/pull/3026 [FLINK-2980] [table] Support for GROUPING SETS clause in Table API. Support for operators GROUPING SETS / ROLLUP / CUBE in Table AP was added in this PR. Also added some tests for check execution of SQL queries with them and improved documentation. PR will close next issue: https://issues.apache.org/jira/browse/FLINK-2980. This PR must be reviewed and merged only after PR #2976. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chermenin/flink flink-2980 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3026.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3026 commit d88bd67c82249a2533d58b1ff231adb1a441e6a3 Author: Alexander ChermeninDate: 2016-12-10T20:09:25Z [FLINK-2980] Base implementation of grouping sets. commit 70c12a240b85333ab3a801e2f4f6ee6d1e961427 Author: Aleksandr Chermenin Date: 2016-12-12T08:58:17Z [FLINK-2980] Implemented grouped expressions. commit de15fdddb35fac5002ae76ec9cec6aa14c9ffef0 Author: Aleksandr Chermenin Date: 2016-12-12T10:18:42Z [FLINK-2980] Added grouping functions. commit 5468db9fd39c26e752554445af5cd2a0e1da5aae Author: Aleksandr Chermenin Date: 2016-12-12T11:39:09Z [FLINK-2980] Improved expressions parser. commit e6aa37255695e0d23d7df43934a88ff53737d303 Author: Aleksandr Chermenin Date: 2016-12-12T12:05:25Z [FLINK-2980] Added support for grouping functions. commit a3d30f39b91ee85d76da4e034b20e6f63ef50fd7 Author: Aleksandr Chermenin Date: 2016-12-12T12:54:19Z [FLINK-2980] Small fixes. commit 8e8d7e16e12c3f50c4a93a1961e975860fb0e4a5 Author: Aleksandr Chermenin Date: 2016-12-12T13:32:57Z [FLINK-2980] Windowed table with grouping sets. commit b13382b3328e1b244dae624c45c5c01580d80098 Author: Aleksandr Chermenin Date: 2016-12-15T10:13:45Z [FLINK-2980] Added tests. commit 6f5371882bf354b73587d4a7f9e526c911794eba Author: Aleksandr Chermenin Date: 2016-12-15T10:27:44Z [FLINK-2980] Small fixes. commit 36ce2bf704a8798f398719b634c30003765c7bbf Author: Aleksandr Chermenin Date: 2016-12-16T08:12:07Z [FLINK-2980] Improved documentation. commit 8d2824f37ebb7215fb901fc10a868717f424eb56 Author: Aleksandr Chermenin Date: 2016-12-16T09:42:30Z [FLINK-2980] Small docs improvements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---