[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).
Repository: flink Updated Branches: refs/heads/release-1.3 45923ffb8 -> 51fb7ed79 [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fb7ed7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fb7ed7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fb7ed7 Branch: refs/heads/release-1.3 Commit: 51fb7ed791bc8c5c1c35dffcd9855a2e5a8f3087 Parents: 0246ce5 Author: Fabian HueskeAuthored: Wed May 17 16:26:27 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 22:06:00 2017 +0200 -- .../common/WindowStartEndPropertiesRule.scala | 39 ++-- .../scala/stream/sql/WindowAggregateTest.scala | 27 ++ 2 files changed, 54 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 7577deb..14e9b21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule override def matches(call: RelOptRuleCall): Boolean = { val project = call.rel(0).asInstanceOf[LogicalProject] // project includes at least on group auxiliary function -project.getProjects.exists { - case c: RexCall => c.getOperator.isGroupAuxiliary - case _ => false + +def hasGroupAuxiliaries(node: RexNode): Boolean = { + node match { +case c: RexCall if c.getOperator.isGroupAuxiliary => true +case c: RexCall => + c.operands.exists(hasGroupAuxiliaries) +case _ => false + } } + +project.getProjects.exists(hasGroupAuxiliaries) } override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] @@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule transformed.project( innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) -// replace window auxiliary function by access to window properties -transformed.project( - project.getProjects.map{ x => -if (WindowStartEndPropertiesRule.isWindowStart(x)) { +def replaceGroupAuxiliaries(node: RexNode): RexNode = { + node match { +case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start - rexBuilder.makeCast(x.getType, transformed.field("w$start"), false) -} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) { + rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) +case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end - rexBuilder.makeCast(x.getType, transformed.field("w$end"), false) -} else { + rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) +case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(replaceGroupAuxiliaries) + c.clone(c.getType, newOps) +case x => // preserve expression x -} } +} + +// replace window auxiliary function by access to window properties +transformed.project( + project.getProjects.map(replaceGroupAuxiliaries) ) val res = transformed.build() call.transformTo(res) http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 2022db8..f95d0ab 100644 ---
[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).
Repository: flink Updated Branches: refs/heads/master c995ebd29 -> 9fc42df68 [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END). This closes #3930. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc42df6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc42df6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc42df6 Branch: refs/heads/master Commit: 9fc42df68d746c633b0d3c8995e0031064bfd362 Parents: 3a65e5a Author: Fabian HueskeAuthored: Wed May 17 16:26:27 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 21:22:12 2017 +0200 -- .../common/WindowStartEndPropertiesRule.scala | 39 ++-- .../scala/stream/sql/WindowAggregateTest.scala | 27 ++ 2 files changed, 54 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 7577deb..14e9b21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule override def matches(call: RelOptRuleCall): Boolean = { val project = call.rel(0).asInstanceOf[LogicalProject] // project includes at least on group auxiliary function -project.getProjects.exists { - case c: RexCall => c.getOperator.isGroupAuxiliary - case _ => false + +def hasGroupAuxiliaries(node: RexNode): Boolean = { + node match { +case c: RexCall if c.getOperator.isGroupAuxiliary => true +case c: RexCall => + c.operands.exists(hasGroupAuxiliaries) +case _ => false + } } + +project.getProjects.exists(hasGroupAuxiliaries) } override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] @@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule transformed.project( innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) -// replace window auxiliary function by access to window properties -transformed.project( - project.getProjects.map{ x => -if (WindowStartEndPropertiesRule.isWindowStart(x)) { +def replaceGroupAuxiliaries(node: RexNode): RexNode = { + node match { +case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start - rexBuilder.makeCast(x.getType, transformed.field("w$start"), false) -} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) { + rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) +case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end - rexBuilder.makeCast(x.getType, transformed.field("w$end"), false) -} else { + rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) +case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(replaceGroupAuxiliaries) + c.clone(c.getType, newOps) +case x => // preserve expression x -} } +} + +// replace window auxiliary function by access to window properties +transformed.project( + project.getProjects.map(replaceGroupAuxiliaries) ) val res = transformed.build() call.transformTo(res) http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 2022db8..f95d0ab 100644 ---