Repository: flink
Updated Branches:
  refs/heads/master 804430bdf -> 7805db813


[FLINK-2000][table] Add SQL-style Aggregation Support


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7805db81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7805db81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7805db81

Branch: refs/heads/master
Commit: 7805db813dd744f13776320d556e1cefa0351464
Parents: 804430b
Author: Cheng Hao <chha...@gmail.com>
Authored: Thu Jun 4 23:17:35 2015 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Jun 8 15:37:15 2015 +0200

----------------------------------------------------------------------
 .../api/table/parser/ExpressionParser.scala     | 20 ++++++++++++++-----
 .../table/test/GroupedAggreagationsITCase.scala | 21 ++++++++++++++++++++
 2 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index 1586f50..7bad7fe 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -43,6 +43,11 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   // KeyWord
 
   lazy val AS: Keyword = Keyword("as")
+  lazy val COUNT: Keyword = Keyword("count")
+  lazy val AVG: Keyword = Keyword("avg")
+  lazy val MIN: Keyword = Keyword("min")
+  lazy val MAX: Keyword = Keyword("max")
+  lazy val SUM: Keyword = Keyword("sum")
 
   // Literals
 
@@ -91,11 +96,16 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
 
-  lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
-  lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
-  lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
-  lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => 
Count(e) }
-  lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
+  lazy val sum: PackratParser[Expression] =
+    (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => 
Sum(e) })
+  lazy val min: PackratParser[Expression] =
+    (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => 
Min(e) })
+  lazy val max: PackratParser[Expression] =
+    (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => 
Max(e) })
+  lazy val count: PackratParser[Expression] =
+    (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { 
e => Count(e) })
+  lazy val avg: PackratParser[Expression] =
+    (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => 
Avg(e) })
 
   lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
     case e ~ _ ~ as ~ _ => Naming(e, as.name)

http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index d76d75c..5afd6ca 100644
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -93,4 +93,25 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) 
extends MultipleProgra
     env.execute()
     expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
   }
+
+  @Test
+  def testSQLStyleAggregations(): Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .select(
+        """Sum( a) as a1, a.sum as a2,
+          |Min (a) as b1, a.min as b2,
+          |Max (a ) as c1, a.max as c2,
+          |Avg ( a ) as d1, a.avg as d2,
+          |Count(a) as e1, a.count as e2
+        """.stripMargin)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "231,231,1,1,21,21,11,11,21,21"
+  }
 }

Reply via email to