[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-09-05 Thread chermenin
Github user chermenin closed the pull request at:

https://github.com/apache/flink/pull/3026


---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97317694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/groupings.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract sealed class GroupFunction extends Expression {
+
+  override def toString = s"GroupFunction($children)"
+
+  private[flink] def replaceExpression(
--- End diff --

Can you add a Scaladoc here (esp. for the parameters)? Please also add some 
line comments in this method to improve readability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97299113
  
--- Diff: docs/dev/table_api.md ---
@@ -790,6 +790,40 @@ val result = in.groupBy('a).select('a, 'b.sum as 'd);
 
 
 
+  GroupingSets
+  
+Similar to a SQL GROUP BY GROUPING SETS clause. A GROUPING SETS 
expression allows to selectively specify the
+set of groups that you want to create within a GROUP BY clause.
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.groupingSets(('a, 'b), 'b, ()).select('a, 'b, 'c.sum as 
'd);
+{% endhighlight %}
+  
+
+
+
+  Cube
+  
+Similar to a SQL GROUP BY CUBE clause. A CUBE expression will 
generate subtotals for all combinations of the dimensions specified.
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.cube('a, 'b).select('a, 'b, 'c.sum as 'd);
+{% endhighlight %}
+  
+
+
+
+  Cube
+  
+Similar to a SQL GROUP BY ROLLUP clause. A ROLLUP expression 
produces group subtotals from right to left and a grand total.
--- End diff --

Can you also add an example here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97306898
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -688,4 +726,37 @@ object array {
   }
 }
 
+/**
+  * Grouping function. Similar to a SQL GROUP_ID function.
--- End diff --

Please add the same description (with example) as in the documentation. 
`apply()` should have the same description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97306369
  
--- Diff: docs/dev/table_api.md ---
@@ -2714,6 +2748,42 @@ ARRAY.element()
   
 
 
+
+  
+{% highlight scala %}
+groupId()
+{% endhighlight %}
+  
+  
+Returns an integer that uniquely identifies the combination of 
grouping keys.
+  
+
+
+
+  
+{% highlight scala %}
+ANY.grouping()
+grouping(ANY)
+{% endhighlight %}
+  
+  
+Returns 1 if expression is rolled up in the current row’s 
grouping set, 0 otherwise.
+  
+
+
+
+  
+{% highlight scala %}
+ANY.groupingId()
+(ANY [, ANY ]*).groupingId()
--- End diff --

I think it is enough to be as close as possible to SQL syntax 
(`groupingId(ANY, ANY, ...)`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97314983
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
 ---
@@ -151,4 +155,35 @@ object ExpressionUtils {
   rexBuilder.makeExactLiteral(value))
   }
 
+  /** Computes the rollup of bit sets.
+*
+* For example, rollup({0}, {1})
--- End diff --

According to Scala style guide 
(http://docs.scala-lang.org/style/scaladoc.html):
"Use the wiki-style syntax instead of HTML wherever possible"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97298808
  
--- Diff: docs/dev/table_api.md ---
@@ -790,6 +790,40 @@ val result = in.groupBy('a).select('a, 'b.sum as 'd);
 
 
 
+  GroupingSets
+  
+Similar to a SQL GROUP BY GROUPING SETS clause. A GROUPING SETS 
expression allows to selectively specify the
+set of groups that you want to create within a GROUP BY clause.
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.groupingSets(('a, 'b), 'b, ()).select('a, 'b, 'c.sum as 
'd);
+{% endhighlight %}
+  
+
+
+
+  Cube
+  
+Similar to a SQL GROUP BY CUBE clause. A CUBE expression will 
generate subtotals for all combinations of the dimensions specified.
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.cube('a, 'b).select('a, 'b, 'c.sum as 'd);
+{% endhighlight %}
+  
+
+
+
+  Cube
--- End diff --

`Rollup`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97320762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -94,7 +94,24 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
 child.construct(relBuilder)
 relBuilder.project(
-  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map {
--- End diff --

I think this translation should rather happen in `resolveExpression` and 
needs to be reworked a bit. E.g. `groupId() as 'a as 'f` fails. Some inline 
comments would also be very good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97314389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ---
@@ -176,6 +179,12 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val atom: PackratParser[Expression] =
 ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
 
+  lazy val grouped: PackratParser[Expression] =
+"(" ~> expressionList <~ ")" ^^ { l => new GroupedExpression(l.toSeq) }
+
+  lazy val unit: PackratParser[Expression] =
--- End diff --

Use a different name for this. My IDE shows warnings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97317531
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/groupings.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract sealed class GroupFunction extends Expression {
+
+  override def toString = s"GroupFunction($children)"
+
+  private[flink] def replaceExpression(
+relBuilder: RelBuilder,
+groupExpressions: Option[Seq[Expression]],
+children: Seq[Attribute] = Seq(),
+indicator: Boolean = false): Expression = {
+
+if (groupExpressions.isDefined) {
+  val expressions = groupExpressions.get
+  if (!indicator) {
+Cast(
--- End diff --

All arguments here are literals. We don't need to use `Expression` 
operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97314552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ---
@@ -375,15 +393,26 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val prefixFlattening: PackratParser[Expression] =
 FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
 
+  lazy val prefixGrouping: PackratParser[Expression] =
+GROUPING ~ "(" ~> composite <~ ")" ^^ { e => Grouping(e) }
+
+  lazy val prefixGroupingId: PackratParser[Expression] =
+GROUPING_ID ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { l => 
GroupingId(l: _*) }
+
+  lazy val prefixGroupId: PackratParser[Expression] =
+GROUP_ID ~ opt("()") ^^ { _ => GroupId() }
+
   lazy val prefixed: PackratParser[Expression] =
 prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | 
prefixAvg |
-  prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | 
prefixTrimWithoutArgs |
-  prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | 
prefixFlattening |
+  prefixStart | prefixEnd |
+  prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | 
prefixIf | prefixExtract |
+  prefixFloor | prefixCeil | prefixGet | prefixFlattening |
+  prefixGroupingId | prefixGrouping | prefixGroupId |
   prefixFunctionCall | prefixFunctionCallOneArg // function call must 
always be at the end
 
   // suffix/prefix composite
 
-  lazy val composite: PackratParser[Expression] = suffixed | prefixed | 
atom |
+  lazy val composite: PackratParser[Expression] = suffixed | prefixed | 
atom | grouped |
--- End diff --

The `grouped` in `expression` should be enough. We don't need it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97312088
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
 ---
@@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression {
 abstract class LeafExpression extends Expression {
   private[flink] val children = Nil
 }
+
+class GroupedExpression(
+private[flink] val children: Seq[Expression]
+  ) extends Expression {
+
+  def this(product: Product) {
+this(
+  product.productIterator
+.map {
+  case e: Expression => e
+  case s: Symbol => UnresolvedFieldReference(s.name)
+  case p: Product => new GroupedExpression(p)
+  case _ => throw new IllegalArgumentException()
--- End diff --

Please throw an `TableException` here and give a meaningful message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97311360
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -928,5 +1032,151 @@ class GroupWindowedTable(
 val fieldExprs = ExpressionParser.parseExpressionList(fields)
 select(fieldExprs: _*)
   }
+}
+
+/**
+  * A table that has been grouped on several sets of grouping keys.
+  */
+class GroupingSetsTable(
--- End diff --

In order to reduce the code complexity, we should adapt `GroupedTable` and 
do a case distinction there. 4 kinds of grouped tables are too much. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97313779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ---
@@ -375,15 +393,26 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val prefixFlattening: PackratParser[Expression] =
 FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
 
+  lazy val prefixGrouping: PackratParser[Expression] =
--- End diff --

If you add `grouping`, `groupingId` and `groupId` to the 
`FunctionCatalog.builtInFunctions`. We don't need the following lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97324083
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 ---
@@ -408,10 +408,20 @@ public static void 
compareKeyValuePairsWithDelta(String expectedLines, String re
}
}
 
+   public static > void 
compareResultCollections(List expected, List actual) {
--- End diff --

Please try to avoid modifying classes outside of `flink-table` unless 
really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97307329
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -15,14 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api
 
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.logical.Minus
-import org.apache.flink.table.expressions.{Alias, Asc, Call, Expression, 
ExpressionParser, Ordering, TableFunctionCall, UnresolvedAlias}
+import org.apache.flink.table.plan.logical.{Minus => MinusNode}
+import org.apache.flink.table.expressions._
--- End diff --

Is this import and the renaming of Minus really necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97323770
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupingSetsTest.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class GroupingSetsTest {
--- End diff --

Because we need to reduce the number of ITCases for improving the built 
time, I propose a different test structure: Add a 
`org.apache.flink.table.runtime.dataset.DataSetAggregateITCase` that tests 
grouping sets (not cube or rollup) in Scala Table API. The remaining tests can 
be logical unit tests for Java Table API, Scala Table API and SQL API. You can 
find additional information how we test here: 
https://issues.apache.org/jira/browse/FLINK-3656


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97312771
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
 ---
@@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression {
 abstract class LeafExpression extends Expression {
   private[flink] val children = Nil
 }
+
+class GroupedExpression(
+private[flink] val children: Seq[Expression]
+  ) extends Expression {
+
+  def this(product: Product) {
+this(
+  product.productIterator
+.map {
+  case e: Expression => e
+  case s: Symbol => UnresolvedFieldReference(s.name)
+  case p: Product => new GroupedExpression(p)
+  case _ => throw new IllegalArgumentException()
+}.toSeq
+)
+  }
+
+  def flatChildren: Seq[Expression] = {
+children.flatMap {
+  case g: GroupedExpression => g.flatChildren
+  case x => Seq(x)
+}
+  }
+
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It's not applicable for grouped expressions.
+*/
+  override private[flink] def resultType = ???
+
+  /**
+* Grouping function. Similar to a SQL GROUPING_ID function.
+*/
+  def groupingId(): Expression = GroupingId(children: _*)
--- End diff --

Remove this function as mentioned previously.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97304886
  
--- Diff: docs/dev/table_api.md ---
@@ -2714,6 +2748,42 @@ ARRAY.element()
   
 
 
+
--- End diff --

Can you add a `Grouping functions` section at the same position than in SQL 
functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97312920
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
 ---
@@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression {
 abstract class LeafExpression extends Expression {
   private[flink] val children = Nil
 }
+
+class GroupedExpression(
+private[flink] val children: Seq[Expression]
+  ) extends Expression {
+
+  def this(product: Product) {
+this(
+  product.productIterator
+.map {
+  case e: Expression => e
+  case s: Symbol => UnresolvedFieldReference(s.name)
+  case p: Product => new GroupedExpression(p)
+  case _ => throw new IllegalArgumentException()
+}.toSeq
+)
+  }
+
+  def flatChildren: Seq[Expression] = {
+children.flatMap {
+  case g: GroupedExpression => g.flatChildren
+  case x => Seq(x)
+}
+  }
+
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It's not applicable for grouped expressions.
+*/
+  override private[flink] def resultType = ???
+
+  /**
+* Grouping function. Similar to a SQL GROUPING_ID function.
+*/
+  def groupingId(): Expression = GroupingId(children: _*)
+
+  override def productElement(n: Int): Expression = children(n)
--- End diff --

Are these `Product` functions not implemented already in super classes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97306096
  
--- Diff: docs/dev/table_api.md ---
@@ -2714,6 +2748,42 @@ ARRAY.element()
   
 
 
+
+  
+{% highlight scala %}
+groupId()
+{% endhighlight %}
+  
+  
+Returns an integer that uniquely identifies the combination of 
grouping keys.
+  
+
+
+
+  
+{% highlight scala %}
+ANY.grouping()
+grouping(ANY)
--- End diff --

In order to reduce the code complexity, I think it is enough if we only 
support the `grouping(...)` syntax.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97322915
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -616,6 +706,118 @@ case class WindowAggregate(
   }
 }
 
+case class GroupingWindowAggregate(
--- End diff --

Please remove this class for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97313160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ---
@@ -289,12 +298,21 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val suffixFlattening: PackratParser[Expression] =
 composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
 
+  lazy val suffixGrouping: PackratParser[Expression] =
--- End diff --

Only support prefix notation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97298700
  
--- Diff: docs/dev/table_api.md ---
@@ -790,6 +790,40 @@ val result = in.groupBy('a).select('a, 'b.sum as 'd);
 
 
 
+  GroupingSets
+  
+Similar to a SQL GROUP BY GROUPING SETS clause. A GROUPING SETS 
expression allows to selectively specify the
+set of groups that you want to create within a GROUP BY clause.
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.groupingSets(('a, 'b), 'b, ()).select('a, 'b, 'c.sum as 
'd);
+{% endhighlight %}
+  
+
+
+
+  Cube
+  
+Similar to a SQL GROUP BY CUBE clause. A CUBE expression will 
generate subtotals for all combinations of the dimensions specified.
--- End diff --

Would be great if you could also add an example. E.g. `.cube('a, 'b)` is 
equivalent to `.groupingSets(...)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97305200
  
--- Diff: docs/dev/table_api.md ---
@@ -2714,6 +2748,42 @@ ARRAY.element()
   
 
 
+
+  
+{% highlight scala %}
+groupId()
+{% endhighlight %}
+  
+  
+Returns an integer that uniquely identifies the combination of 
grouping keys.
+  
+
+
+
+  
+{% highlight scala %}
+ANY.grouping()
+grouping(ANY)
+{% endhighlight %}
+  
+  
+Returns 1 if expression is rolled up in the current row’s 
grouping set, 0 otherwise.
+  
+
+
+
+  
+{% highlight scala %}
+ANY.groupingId()
+(ANY [, ANY ]*).groupingId()
+groupingId(ANY [, ANY ]*)
+{% endhighlight %}
+  
+  
+Returns a bit vector of the given grouping expressions.
+  
+
+
--- End diff --

Add the same documentation to the Java section.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97322831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -264,6 +281,79 @@ case class Aggregate(
   }
 }
 
+case class GroupingAggregation(
--- End diff --

I wonder if we could merge this class with `Aggregate`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97310709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -928,5 +1032,151 @@ class GroupWindowedTable(
 val fieldExprs = ExpressionParser.parseExpressionList(fields)
 select(fieldExprs: _*)
   }
+}
+
+/**
+  * A table that has been grouped on several sets of grouping keys.
+  */
+class GroupingSetsTable(
+  private[flink] val table: Table,
+  private[flink] val groups: Seq[Seq[Expression]],
+  private[flink] val sqlKind: SqlKind) {
--- End diff --

I think we don't need this parameter if the translation is done in `cube()` 
and `rollup()` already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97311604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -928,5 +1032,151 @@ class GroupWindowedTable(
 val fieldExprs = ExpressionParser.parseExpressionList(fields)
 select(fieldExprs: _*)
   }
+}
+
+/**
+  * A table that has been grouped on several sets of grouping keys.
+  */
+class GroupingSetsTable(
+  private[flink] val table: Table,
+  private[flink] val groups: Seq[Seq[Expression]],
+  private[flink] val sqlKind: SqlKind) {
+
+  /**
+* Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
+* The field expressions can contain complex expressions and 
aggregations.
+*
+* Example:
+*
+* {{{
+*   tab.groupingSets('key).select('key, 'value.avg + " The average" as 
'average)
+* }}}
+*/
+  def select(fields: Expression*): Table = {
+
+val (aggNames, propNames) = extractAggregationsAndProperties(fields, 
table.tableEnv)
+
+if (propNames.nonEmpty) {
+  throw ValidationException("Window properties can only be used on 
windowed tables.")
+}
+
+val groupingSets = sqlKind match {
+  case SqlKind.CUBE => ExpressionUtils.cube(groups)
+  case SqlKind.ROLLUP => ExpressionUtils.rollup(groups)
+  case _ => groups
+}
+
+val projectsOnAgg = replaceAggregationsAndProperties(
+  fields, table.tableEnv, aggNames, propNames)
+val projectFields = extractFieldReferences(fields ++ 
groupingSets.flatten.distinct)
+
+val logical =
+  Project(projectsOnAgg,
+GroupingAggregation(groupingSets, aggNames.map(a => Alias(a._1, 
a._2)).toSeq,
+Project(projectFields, 
table.logicalPlan).validate(table.tableEnv)
+).validate(table.tableEnv)
+  ).validate(table.tableEnv)
+
+new Table(table.tableEnv, logical)
+  }
+
+  /**
+* Performs a selection operation on a grouped table. Similar to an SQL 
SELECT statement.
+* The field expressions can contain complex expressions and 
aggregations.
+*
+* Example:
+*
+* {{{
+*   tab.groupBy("key").select("key, value.avg + ' The average' as 
average")
+* }}}
+*/
+  def select(fields: String): Table = {
+val fieldExprs = ExpressionParser.parseExpressionList(fields)
+select(fieldExprs: _*)
+  }
 
+  /**
+* Groups the records of a table by assigning them to windows defined 
by a time or row interval.
+*
+* For streaming tables of infinite size, grouping into windows is 
required to define finite
+* groups on which group-based aggregates can be computed.
+*
+* For batch tables of finite size, windowing essentially provides 
shortcuts for time-based
+* groupBy.
+*
+* @param groupWindow group-window that specifies how elements are 
grouped.
+* @return A windowed table.
+*/
+  def window(groupWindow: GroupWindow): GroupingSetsWindowedTable = {
+if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw new ValidationException(s"Windows on batch tables are 
currently not supported.")
+}
+new GroupingSetsWindowedTable(table, groups, sqlKind, groupWindow)
+  }
+}
+
+class GroupingSetsWindowedTable(
--- End diff --

We decided to not support grouping sets in a stream environment yet and 
there also no tests for it. Could you remove this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97312299
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
 ---
@@ -86,3 +85,44 @@ abstract class UnaryExpression extends Expression {
 abstract class LeafExpression extends Expression {
   private[flink] val children = Nil
 }
+
+class GroupedExpression(
+private[flink] val children: Seq[Expression]
+  ) extends Expression {
+
+  def this(product: Product) {
+this(
+  product.productIterator
+.map {
+  case e: Expression => e
+  case s: Symbol => UnresolvedFieldReference(s.name)
+  case p: Product => new GroupedExpression(p)
+  case _ => throw new IllegalArgumentException()
+}.toSeq
+)
+  }
+
+  def flatChildren: Seq[Expression] = {
+children.flatMap {
+  case g: GroupedExpression => g.flatChildren
+  case x => Seq(x)
+}
+  }
+
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It's not applicable for grouped expressions.
+*/
+  override private[flink] def resultType = ???
--- End diff --

This should be a `UnsupportedOperationException` with a meaningful message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2017-01-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3026#discussion_r97308739
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
 ---
@@ -66,7 +66,7 @@ import _root_.scala.reflect.ClassTag
   * }}}
   *
   */
-package object scala extends ImplicitExpressionConversions {
+package object scala extends ImplicitExpressionConversions with 
ImplicitGroupedConversions {
--- End diff --

You can add the contents of `ImplicitGroupedConversions` to 
`ImplicitExpressionConversions`. We don't need an additional class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

2016-12-19 Thread chermenin
GitHub user chermenin opened a pull request:

https://github.com/apache/flink/pull/3026

[FLINK-2980] [table] Support for GROUPING SETS clause in Table API.

Support for operators GROUPING SETS / ROLLUP / CUBE in Table AP was added 
in this PR.
Also added some tests for check execution of SQL queries with them and 
improved documentation.
PR will close next issue: https://issues.apache.org/jira/browse/FLINK-2980.
This PR must be reviewed and merged only after PR #2976.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chermenin/flink flink-2980

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3026


commit d88bd67c82249a2533d58b1ff231adb1a441e6a3
Author: Alexander Chermenin 
Date:   2016-12-10T20:09:25Z

[FLINK-2980] Base implementation of grouping sets.

commit 70c12a240b85333ab3a801e2f4f6ee6d1e961427
Author: Aleksandr Chermenin 
Date:   2016-12-12T08:58:17Z

[FLINK-2980] Implemented grouped expressions.

commit de15fdddb35fac5002ae76ec9cec6aa14c9ffef0
Author: Aleksandr Chermenin 
Date:   2016-12-12T10:18:42Z

[FLINK-2980] Added grouping functions.

commit 5468db9fd39c26e752554445af5cd2a0e1da5aae
Author: Aleksandr Chermenin 
Date:   2016-12-12T11:39:09Z

[FLINK-2980] Improved expressions parser.

commit e6aa37255695e0d23d7df43934a88ff53737d303
Author: Aleksandr Chermenin 
Date:   2016-12-12T12:05:25Z

[FLINK-2980] Added support for grouping functions.

commit a3d30f39b91ee85d76da4e034b20e6f63ef50fd7
Author: Aleksandr Chermenin 
Date:   2016-12-12T12:54:19Z

[FLINK-2980] Small fixes.

commit 8e8d7e16e12c3f50c4a93a1961e975860fb0e4a5
Author: Aleksandr Chermenin 
Date:   2016-12-12T13:32:57Z

[FLINK-2980] Windowed table with grouping sets.

commit b13382b3328e1b244dae624c45c5c01580d80098
Author: Aleksandr Chermenin 
Date:   2016-12-15T10:13:45Z

[FLINK-2980] Added tests.

commit 6f5371882bf354b73587d4a7f9e526c911794eba
Author: Aleksandr Chermenin 
Date:   2016-12-15T10:27:44Z

[FLINK-2980] Small fixes.

commit 36ce2bf704a8798f398719b634c30003765c7bbf
Author: Aleksandr Chermenin 
Date:   2016-12-16T08:12:07Z

[FLINK-2980] Improved documentation.

commit 8d2824f37ebb7215fb901fc10a868717f424eb56
Author: Aleksandr Chermenin 
Date:   2016-12-16T09:42:30Z

[FLINK-2980] Small docs improvements.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---