[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).

2017-05-18 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.3 45923ffb8 -> 51fb7ed79


[FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., 
TUMBLE_END).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fb7ed7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fb7ed7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fb7ed7

Branch: refs/heads/release-1.3
Commit: 51fb7ed791bc8c5c1c35dffcd9855a2e5a8f3087
Parents: 0246ce5
Author: Fabian Hueske 
Authored: Wed May 17 16:26:27 2017 +0200
Committer: Fabian Hueske 
Committed: Thu May 18 22:06:00 2017 +0200

--
 .../common/WindowStartEndPropertiesRule.scala   | 39 ++--
 .../scala/stream/sql/WindowAggregateTest.scala  | 27 ++
 2 files changed, 54 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 7577deb..14e9b21 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule
   override def matches(call: RelOptRuleCall): Boolean = {
 val project = call.rel(0).asInstanceOf[LogicalProject]
 // project includes at least on group auxiliary function
-project.getProjects.exists {
-  case c: RexCall => c.getOperator.isGroupAuxiliary
-  case _ => false
+
+def hasGroupAuxiliaries(node: RexNode): Boolean = {
+  node match {
+case c: RexCall if c.getOperator.isGroupAuxiliary => true
+case c: RexCall =>
+  c.operands.exists(hasGroupAuxiliaries)
+case _ => false
+  }
 }
+
+project.getProjects.exists(hasGroupAuxiliaries)
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
+
 val project = call.rel(0).asInstanceOf[LogicalProject]
 val innerProject = call.rel(1).asInstanceOf[LogicalProject]
 val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
@@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule
 transformed.project(
   innerProject.getProjects ++ Seq(transformed.field("w$start"), 
transformed.field("w$end")))
 
-// replace window auxiliary function by access to window properties
-transformed.project(
-  project.getProjects.map{ x =>
-if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+def replaceGroupAuxiliaries(node: RexNode): RexNode = {
+  node match {
+case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
   // replace expression by access to window start
-  rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
-} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+  rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
+case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
   // replace expression by access to window end
-  rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
-} else {
+  rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+case c: RexCall =>
+  // replace expressions in children
+  val newOps = c.getOperands.map(replaceGroupAuxiliaries)
+  c.clone(c.getType, newOps)
+case x =>
   // preserve expression
   x
-}
   }
+}
+
+// replace window auxiliary function by access to window properties
+transformed.project(
+  project.getProjects.map(replaceGroupAuxiliaries)
 )
 val res = transformed.build()
 call.transformTo(res)

http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 2022db8..f95d0ab 100644
--- 

[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).

2017-05-18 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master c995ebd29 -> 9fc42df68


[FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., 
TUMBLE_END).

This closes #3930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc42df6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc42df6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc42df6

Branch: refs/heads/master
Commit: 9fc42df68d746c633b0d3c8995e0031064bfd362
Parents: 3a65e5a
Author: Fabian Hueske 
Authored: Wed May 17 16:26:27 2017 +0200
Committer: Fabian Hueske 
Committed: Thu May 18 21:22:12 2017 +0200

--
 .../common/WindowStartEndPropertiesRule.scala   | 39 ++--
 .../scala/stream/sql/WindowAggregateTest.scala  | 27 ++
 2 files changed, 54 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 7577deb..14e9b21 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule
   override def matches(call: RelOptRuleCall): Boolean = {
 val project = call.rel(0).asInstanceOf[LogicalProject]
 // project includes at least on group auxiliary function
-project.getProjects.exists {
-  case c: RexCall => c.getOperator.isGroupAuxiliary
-  case _ => false
+
+def hasGroupAuxiliaries(node: RexNode): Boolean = {
+  node match {
+case c: RexCall if c.getOperator.isGroupAuxiliary => true
+case c: RexCall =>
+  c.operands.exists(hasGroupAuxiliaries)
+case _ => false
+  }
 }
+
+project.getProjects.exists(hasGroupAuxiliaries)
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
+
 val project = call.rel(0).asInstanceOf[LogicalProject]
 val innerProject = call.rel(1).asInstanceOf[LogicalProject]
 val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
@@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule
 transformed.project(
   innerProject.getProjects ++ Seq(transformed.field("w$start"), 
transformed.field("w$end")))
 
-// replace window auxiliary function by access to window properties
-transformed.project(
-  project.getProjects.map{ x =>
-if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+def replaceGroupAuxiliaries(node: RexNode): RexNode = {
+  node match {
+case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
   // replace expression by access to window start
-  rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
-} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+  rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
+case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
   // replace expression by access to window end
-  rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
-} else {
+  rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+case c: RexCall =>
+  // replace expressions in children
+  val newOps = c.getOperands.map(replaceGroupAuxiliaries)
+  c.clone(c.getType, newOps)
+case x =>
   // preserve expression
   x
-}
   }
+}
+
+// replace window auxiliary function by access to window properties
+transformed.project(
+  project.getProjects.map(replaceGroupAuxiliaries)
 )
 val res = transformed.build()
 call.transformTo(res)

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 2022db8..f95d0ab 100644
---