Repository: flink Updated Branches: refs/heads/master 804430bdf -> 7805db813
[FLINK-2000][table] Add SQL-style Aggregation Support Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7805db81 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7805db81 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7805db81 Branch: refs/heads/master Commit: 7805db813dd744f13776320d556e1cefa0351464 Parents: 804430b Author: Cheng Hao <chha...@gmail.com> Authored: Thu Jun 4 23:17:35 2015 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jun 8 15:37:15 2015 +0200 ---------------------------------------------------------------------- .../api/table/parser/ExpressionParser.scala | 20 ++++++++++++++----- .../table/test/GroupedAggreagationsITCase.scala | 21 ++++++++++++++++++++ 2 files changed, 36 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala index 1586f50..7bad7fe 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala @@ -43,6 +43,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // KeyWord lazy val AS: Keyword = Keyword("as") + lazy val COUNT: Keyword = Keyword("count") + lazy val AVG: Keyword = Keyword("avg") + lazy val MIN: Keyword = Keyword("min") + lazy val MAX: Keyword = Keyword("max") + lazy val SUM: Keyword = Keyword("sum") // Literals @@ -91,11 +96,16 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) } - lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) } - lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) } - lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) } - lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) } - lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) } + lazy val sum: PackratParser[Expression] = + (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) }) + lazy val min: PackratParser[Expression] = + (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) }) + lazy val max: PackratParser[Expression] = + (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) }) + lazy val count: PackratParser[Expression] = + (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) }) + lazy val avg: PackratParser[Expression] = + (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) }) lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { case e ~ _ ~ as ~ _ => Naming(e, as.name) http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala index d76d75c..5afd6ca 100644 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala @@ -93,4 +93,25 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra env.execute() expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" } + + @Test + def testSQLStyleAggregations(): Unit = { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + val env = ExecutionEnvironment.getExecutionEnvironment + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .select( + """Sum( a) as a1, a.sum as a2, + |Min (a) as b1, a.min as b2, + |Max (a ) as c1, a.max as c2, + |Avg ( a ) as d1, a.avg as d2, + |Count(a) as e1, a.count as e2 + """.stripMargin) + + ds.writeAsText(resultPath, WriteMode.OVERWRITE) + env.execute() + expected = "231,231,1,1,21,21,11,11,21,21" + } }