flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule

2017-11-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.4 e7f7d0c93 -> 42e24413b


[FLINK-7986] [table] Introduce FilterSetOpTransposeRule

This closes #4956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42e24413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42e24413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42e24413

Branch: refs/heads/release-1.4
Commit: 42e24413b5a47928e06f2a61086f7559370c65d8
Parents: e7f7d0c
Author: Xpray 
Authored: Mon Nov 6 23:47:33 2017 +0800
Committer: twalthr 
Committed: Thu Nov 16 16:16:22 2017 +0100

--
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../api/batch/table/SetOperatorsTest.scala  | 80 
 .../api/stream/table/SetOperatorsTest.scala | 68 +
 3 files changed, 150 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index dcc735d..a20d14f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -52,6 +52,8 @@ object FlinkRuleSets {
 FilterJoinRule.JOIN,
 // push filter through an aggregation
 FilterAggregateTransposeRule.INSTANCE,
+// push filter through set operation
+FilterSetOpTransposeRule.INSTANCE,
 
 // aggregation and projection rules
 AggregateProjectMergeRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/42e24413/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 2d4e205..35f4429 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase {
 
 util.verifyJavaTable(in, expected)
   }
+
+  @Test
+  def testFilterUnionTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.unionAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetUnion",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("union", "a", "b", "c")
+),
+term("groupBy", "b"),
+term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+  ),
+  term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilterMinusTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.minusAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetMinus",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("minus", "a", "b", "c")
+),
+term("groupBy", "b"),
+  

flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule

2017-11-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master cd1fbc078 -> 81dc260dc


[FLINK-7986] [table] Introduce FilterSetOpTransposeRule

This closes #4956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d

Branch: refs/heads/master
Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e
Parents: cd1fbc0
Author: Xpray 
Authored: Mon Nov 6 23:47:33 2017 +0800
Committer: twalthr 
Committed: Thu Nov 16 14:43:50 2017 +0100

--
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../api/batch/table/SetOperatorsTest.scala  | 80 
 .../api/stream/table/SetOperatorsTest.scala | 68 +
 3 files changed, 150 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index dcc735d..a20d14f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -52,6 +52,8 @@ object FlinkRuleSets {
 FilterJoinRule.JOIN,
 // push filter through an aggregation
 FilterAggregateTransposeRule.INSTANCE,
+// push filter through set operation
+FilterSetOpTransposeRule.INSTANCE,
 
 // aggregation and projection rules
 AggregateProjectMergeRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 2d4e205..35f4429 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase {
 
 util.verifyJavaTable(in, expected)
   }
+
+  @Test
+  def testFilterUnionTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.unionAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetUnion",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("union", "a", "b", "c")
+),
+term("groupBy", "b"),
+term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+  ),
+  term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+)
+
+util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilterMinusTranspose(): Unit = {
+val util = batchTestUtil()
+val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+val result = left.minusAll(right)
+  .where('a > 0)
+  .groupBy('b)
+  .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+val expected = unaryNode(
+  "DataSetCalc",
+  unaryNode(
+"DataSetAggregate",
+binaryNode(
+  "DataSetMinus",
+  unaryNode(
+"DataSetCalc",
+batchTableNode(0),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  unaryNode(
+"DataSetCalc",
+batchTableNode(1),
+term("select", "a", "b", "c"),
+term("where", ">(a, 0)")
+  ),
+  term("minus", "a", "b", "c")
+),
+term("groupBy", "b"),
+